package r3.zookeeper;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.RetryForever;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import r3.common.R3Url;
import r3.registry.RegistryCenter;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
 * ZookeeperRegistryCenter
 *
 * @author zhoufn
 * @create 2017-12-25 21:09
 **/
@Slf4j
public class ZookeeperRegistryCenter implements RegistryCenter {

    /**
     * zookeeper连接地址
     */
    private String address;

    /**
     * zookeeper命名空间
     */
    private String namespace;

    /**
     * 回话的超时时间
     */
    private int sessionTimeout;

    /**
     * 连接的超时时间
     */
    private int connectionTimeout;

    /**
     * 由于某种未知原因，导致zookeeper节点丢失所以提供了循环注册
     */
    private int interval;

    /**
     * 当前注册的服务列表的快照
     */
    private ArrayList<R3Url> registryServices = new ArrayList<>();

    /**
     * 服务路径正则
     * /r3.example.api.SayHelloService/workers/worker-1/sayHelloWorker@192.168.2.173:20080
     */
    private String service_reg = "^\\/.*/.*/[a-zA-Z]{1,}@.*[0-9]{1,}$";


    /**
     * 维护当前zookeeper上服务的快照
     * 快照格式
     * HashMap<interface,HashMap<applicaionName,HashMap<zookeeperPath,R3Url>>>
     */
    private HashMap<String,HashMap<String,HashMap<String,R3Url>>> registerSnapshot = new HashMap<>();


    public ZookeeperRegistryCenter(String address, String namespace, int sessionTimeout, int connectionTimeout,int interval) {
        this.address = address;
        this.namespace = namespace;
        this.sessionTimeout = sessionTimeout;
        this.connectionTimeout = connectionTimeout;
        this.interval = interval;
    }

    /**
     * curator client
     */
    private CuratorFramework client;

    /**
     * 初始化
     */
    @Override
    public void init() throws Exception {
        this.client = CuratorFrameworkFactory.builder()
                .connectString(this.address)
                .connectionTimeoutMs(this.connectionTimeout)
                .sessionTimeoutMs(this.sessionTimeout)
                .namespace(this.namespace)
                .retryPolicy(new RetryForever(5000))
                .build();
        this.client.start();
        this.initRegistrySnapshot();
        this.addListener();
        new Thread(() -> {
            while (true) {
                try {
                    for (R3Url r3Url : registryServices) {
                        onlyRegiste(r3Url);
                    }
                    Thread.sleep(interval);
                }catch (Exception e){
                    log.error("ZookeeperRegistryCenter registry error",e);
                }
            }
        }).start();
        log.debug("ZookeeperRegistryCenter init successful...");
    }


    /**
     * 初始化注册服务
     *
     * @throws Exception
     */
    private void initRegistrySnapshot() throws Exception {
        List<String> interfces = this.client.getChildren().forPath("/");
        for (String interfce : interfces) {
            List<String> workers = this.client.getChildren().forPath("/" + interfce + "/workers");
            for (String worker : workers) {
                List<String> leafNodes = this.client.getChildren().forPath("/" + interfce + "/workers/" + worker);
                for (String leafNode : leafNodes) {
                    String path = "/" + interfce + "/workers/" + worker + "/" + leafNode;
                    R3Url r3Url = R3Url.parse(path);
                    this.registeSnapshot(path,r3Url);
                }
            }
        }
    }

    /**
     * 对当前的curator client添加监听事件，用来维护本地的zookeeper服务节点快照
     *
     * @throws Exception
     */
    private void addListener() {
       new Thread(()->{
           if (client == null) {
               return;
           }
           try {
               TreeCache treeCache = new TreeCache(client, "/");
               TreeCacheListener treeCacheListener = (CuratorFramework client, TreeCacheEvent event) -> {
                   ChildData childData = event.getData();
                   switch (event.getType()) {
                       case NODE_ADDED:
                           log.debug("NODE_ADDED >> " + childData.getPath());
                           this.addEvent(childData);
                           break;
                       case NODE_REMOVED:
                           log.debug("NODE_REMOVED >> " + childData.getPath());
                           this.removeEvent(childData);
                           break;
                       case NODE_UPDATED:
                           log.debug("NODE_UPDATED >> " + childData.getPath());
                           break;
                       default:
                           break;
                   }
               };
               treeCache.getListenable().addListener(treeCacheListener);
               treeCache.start();
           } catch (Exception e) {
               log.error("ZookeeperRegistryCenter addListener error.", e);
           }
       }).start();
    }

    /**
     * 节点移除事件
     * @param data
     * @throws Exception
     */
    private void removeEvent(ChildData data) throws Exception{
        if(data == null || !data.getPath().matches(this.service_reg)){
            return;
        }
        R3Url url = R3Url.parse(data.getPath());
        if(this.registerSnapshot.containsKey(url.getInterfce())){
            if(this.registerSnapshot.get(url.getInterfce()).containsKey(url.getApplicaionName())){
                if(this.registerSnapshot.get(url.getInterfce()).get(url.getApplicaionName()).containsKey(data.getPath())){
                    this.registerSnapshot.get(url.getInterfce()).get(url.getApplicaionName()).remove(data.getPath());
                }
            }
        }
    }

    /**
     * 新节点注册事件
     *  /r3.example.api.SayHelloService/workers/worker-1/sayHelloWorker@192.168.2.173:20080
     * @param data
     * @throws Exception
     */
    private void addEvent(ChildData data) throws Exception {
        if(data == null){
            return;
        }
        if(!data.getPath().matches(this.service_reg)){
            return;
        }
        R3Url url = R3Url.parse(data.getPath());
        this.registeSnapshot(data.getPath(),url);
    }

    /**
     * 本地快照内注册R3url
     * @param url
     */
    private void registeSnapshot(String path,R3Url url){
        HashMap<String,HashMap<String,R3Url>> applicationSnapshot;
        HashMap<String,R3Url> urlSnapshot;
        if(this.registerSnapshot.containsKey(url.getInterfce())){
            applicationSnapshot = this.registerSnapshot.get(url.getInterfce());
            if(applicationSnapshot.containsKey(url.getApplicaionName())){
                urlSnapshot = applicationSnapshot.get(url.getApplicaionName());
                if(!urlSnapshot.containsKey(path)){
                    urlSnapshot.put(path,url);
                }
            }else{
                urlSnapshot = new HashMap<>();
                urlSnapshot.put(path,url);
                applicationSnapshot.put(url.getApplicaionName(),urlSnapshot);
            }
        }else{
            applicationSnapshot = new HashMap<>();
            urlSnapshot = new HashMap<>();
            urlSnapshot.put(path,url);
            applicationSnapshot.put(url.getApplicaionName(),urlSnapshot);
            this.registerSnapshot.put(url.getInterfce(),applicationSnapshot);
        }
    }

    /**
     * 关闭
     */
    @Override
    public void close() {
        this.client.close();
    }


    /**
     * 注册
     *
     * @param url
     */
    @Override
    public void registe(R3Url url) throws Exception {
        this.onlyRegiste(url);
        this.registryServices.add(url);
    }

    /**
     * 只注册
     * @param url
     * @throws Exception
     */
    private void onlyRegiste(R3Url url) throws Exception{
        String basePath = "/" + url.getInterfce() + "/workers/" + url.getApplicaionName();
        String nodePath = "/" + url.getBeanId() + "@" + url.getApplicationHost() + ":" + url.getApplicationPort();
        this.createNode(basePath, nodePath);
    }

    /**
     * 注册服务到zookeeper
     * @param basePath
     * @param nodePath
     * @throws Exception
     */
    private void createNode(String basePath, String nodePath) throws Exception {
        Stat stat = this.client.checkExists().forPath(basePath);
        if (stat == null)
            this.client.create().creatingParentsIfNeeded().forPath(basePath);
        stat = this.client.checkExists().forPath(basePath + nodePath);
        if (stat == null)
            this.client.create().withMode(CreateMode.EPHEMERAL).forPath(basePath + nodePath);
    }

    /**
     * 发现
     *
     * @param interfce
     * @return applicationName + List
     */
    @Override
    public HashMap<String,HashMap<String,R3Url>> discover(String interfce) throws Exception {
       return this.registerSnapshot.get(interfce);
    }

}
